/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.procedure2;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Internal cleaner that removes the completed procedure results after a TTL.
 * <p/>
 * NOTE: This is a special case handled in timeoutLoop().
 * <p/>
 * Since the client code looks more or less like:
 *
 * <pre>
 *   procId = master.doOperation()
 *   while (master.getProcResult(procId) == ProcInProgress);
 * </pre>
 *
 * The master should not throw away the proc result as soon as the procedure is done but should wait
 * a result request from the client (see executor.removeResult(procId)) The client will call
 * something like master.isProcDone() or master.getProcResult() which will return the result/state
 * to the client, and it will mark the completed proc as ready to delete. note that the client may
 * not receive the response from the master (e.g. master failover) so, if we delay a bit the real
 * deletion of the proc result the client will be able to get the result the next try.
 */
@InterfaceAudience.Private
class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEnvironment> {
    private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);

    static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
    private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec

    private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
    private static final int DEFAULT_BATCH_SIZE = 32;

    private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
    private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
    private final ProcedureStore store;
    private final IdLock procExecutionLock;
    private Configuration conf;

    public CompletedProcedureCleaner(Configuration conf, ProcedureStore store, IdLock procExecutionLock,
            Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap, Map<NonceKey, Long> nonceKeysToProcIdsMap) {
        // set the timeout interval that triggers the periodic-procedure
        super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
        this.completed = completedMap;
        this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
        this.store = store;
        this.procExecutionLock = procExecutionLock;
        this.conf = conf;
    }

    @Override
    protected void periodicExecute(final TEnvironment env) {
        if(completed.isEmpty()) {
            if(LOG.isTraceEnabled()) {
                LOG.trace("No completed procedures to cleanup.");
            }
            return;
        }

        final long evictTtl = conf.getInt(ProcedureExecutor.EVICT_TTL_CONF_KEY, ProcedureExecutor.DEFAULT_EVICT_TTL);
        final long evictAckTtl = conf.getInt(ProcedureExecutor.EVICT_ACKED_TTL_CONF_KEY, ProcedureExecutor.DEFAULT_ACKED_EVICT_TTL);
        final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);

        final long[] batchIds = new long[batchSize];
        int batchCount = 0;

        final long now = EnvironmentEdgeManager.currentTime();
        final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it = completed.entrySet().iterator();
        while(it.hasNext() && store.isRunning()) {
            final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
            final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
            final Procedure<?> proc = retainer.getProcedure();
            IdLock.Entry lockEntry;
            try {
                lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
            } catch(IOException e) {
                // can only happen if interrupted, so not a big deal to propagate it
                throw new UncheckedIOException(e);
            }
            try {
                // TODO: Select TTL based on Procedure type
                if(retainer.isExpired(now, evictTtl, evictAckTtl)) {
                    // Failed procedures aren't persisted in WAL.
                    if(!(proc instanceof FailedProcedure)) {
                        batchIds[batchCount++] = entry.getKey();
                        if(batchCount == batchIds.length) {
                            store.delete(batchIds, 0, batchCount);
                            batchCount = 0;
                        }
                    }
                    final NonceKey nonceKey = proc.getNonceKey();
                    if(nonceKey != null) {
                        nonceKeysToProcIdsMap.remove(nonceKey);
                    }
                    it.remove();
                    LOG.trace("Evict completed {}", proc);
                }
            } finally {
                procExecutionLock.releaseLockEntry(lockEntry);
            }
        }
        if(batchCount > 0) {
            store.delete(batchIds, 0, batchCount);
        }
    }
}